Skip to content

Add StatisticsContext parameter to partition_statistics#21815

Open
asolimando wants to merge 11 commits into
apache:mainfrom
asolimando:asolimando/partition-statistics-context
Open

Add StatisticsContext parameter to partition_statistics#21815
asolimando wants to merge 11 commits into
apache:mainfrom
asolimando:asolimando/partition-statistics-context

Conversation

@asolimando

@asolimando asolimando commented Apr 23, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #20184

Rationale for this change

ExecutionPlan::partition_statistics forces each operator to re-fetch child statistics internally, causing redundant subtree walks in deep plans.

What changes are included in this PR?

  • Deprecate partition_statistics in favor of statistics_with_args(&self, args: &StatisticsArgs), an extensible signature that won't require downstream churn when new parameters are added
  • StatisticsArgs carries the partition index and a shared per-call StatsCache, eliminating redundant subtree walks within a single compute_statistics call
  • Child stats are pre-computed with partition=None and cached; operators look them up via args.child_stats_of(child) (overall) or args.child_stats_for(child) (partition-aware)
  • Criterion micro-benchmark on three plan shapes from [EPIC] Improve query planning speed #19795

Tests

Existing tests pass unchanged. New unit test verifies the caching contract.

Test plan

  • cargo fmt --all
  • cargo clippy --all-targets --all-features -- -D warnings
  • cargo test --profile ci --all-features on affected crates
  • Criterion benchmark: ~26x (coalesce chain), ~5x (cross-join tree), ~25x (filter chain) speedup

Disclaimer: I used AI to assist in the code generation, I have manually reviewed the output and it matches my intention and understanding.

@github-actions github-actions Bot added documentation Improvements or additions to documentation optimizer Optimizer rules core Core DataFusion crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Apr 23, 2026
@asolimando

Copy link
Copy Markdown
Member Author

Hi @xudong963, I have opened the PR as a prerequisite for #21122, as discussed.

This is a breaking change and I therefore added a section under .../library-user-guide/upgrading/54.0.0.md‎, I have checked around what usually goes there, but I'd appreciate if you could take a deeper look and confirm if I captured what's expected for the update guide.

Looking forward to your feedback!

@xudong963

Copy link
Copy Markdown
Member

@asolimando thanks, I'll review it next Monday! /cc @jonathanc-n

@asolimando

Copy link
Copy Markdown
Member Author

@asolimando thanks, I'll review it next Monday! /cc @jonathanc-n

Gentle reminder @xudong963 :)

@xudong963 xudong963 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asolimando thanks! I'm sorry that I'm busy with others this week.

This PR doesn't fully solve the problem it claims to. The stated goal in the PR description and #20184 is to eliminate exponential recomputation. But for any plan containing a CoalescePartitionsExec, SortPreservingMergeExec, RepartitionExec, HashJoinExec (CollectLeft/Auto), CrossJoinExec, or NestedLoopJoinExec — which is most non-trivial plans — the operator restarts a fresh bottom-up walk from inside its own partition_statistics IIUC. So the recomputation isn't gone;

Caching sounds good, how about making caching part of StatisticsContext from day one, then we can have some benchmarks to show off the gains which will be easier for the community to accept the PR, wdyt?

Comment thread datafusion/physical-plan/src/execution_plan.rs Outdated
Comment thread datafusion/physical-plan/src/statistics_context.rs Outdated
@asolimando

Copy link
Copy Markdown
Member Author

@asolimando thanks! I'm sorry that I'm busy with others this week.

This PR doesn't fully solve the problem it claims to. The stated goal in the PR description and #20184 is to eliminate exponential recomputation. But for any plan containing a CoalescePartitionsExec, SortPreservingMergeExec, RepartitionExec, HashJoinExec (CollectLeft/Auto), CrossJoinExec, or NestedLoopJoinExec — which is most non-trivial plans — the operator restarts a fresh bottom-up walk from inside its own partition_statistics IIUC. So the recomputation isn't gone;

Caching sounds good, how about making caching part of StatisticsContext from day one, then we can have some benchmarks to show off the gains which will be easier for the community to accept the PR, wdyt?

Thank you for your input @xudong963, no need to apologies, it's understandable!

You raise a fair point, we fully avoid the recomputation only for linear plans, but operators that call compute_statistics(child, None) internally don't benefit. This is noted in the "What remains for follow-up" section but I agree it might not be enough for the first iteration, and I anyway should have marked "partially closes #20184".

Re. the cache, I identified the need for the StatisticsRegistry already, and we discussed with @kosiew in the related PR (#21483, comment, branch asolimando/statistics-planner-with-statscache-v2). We agreed to defer it to limit scope, but this is the right place to discuss it.

One limitation I identified on the StatsCache (as I called it there), is around the cache key, which should "identify" an ExecutionPlan, which doesn't have any stable id other than its memory pointer ( so the cache key is effectively (Arc::as_ptr, partition)), but I am concerned of nodes being disposed (and re-used).

Cache lifecycle/scope:

  1. single invocation of compute_statistics (as described in Let partition_statistics accept pre-computed children statistics #20184): if we agree on this, then the concern is not valid, as the plan tree is "stable" during the lifetime. When e.g. CoalescePartitionsExec calls compute_statistics(child, None) internally, the cache already has the subtree results, fully eliminating redundant walks.

  2. multiple invocations of compute_statistics (same rule or cross-rules): here we necessarily need a stable node ID and we can't rely on the pointer, since nodes can be dropped/recreated

The scope of #20184 is, in my understanding, 1. (single walk), if you agree with that, I plan to use (Arc::as_ptr, partition) as cache key, and introducing node IDs and expanding the cache lifetime IMO be tackled as a followup (I can create issues for that, if the direction is confirmed), as with this solution we should already see computational benefits.

Re. benchmarks, do you have a specific workload in mind (e.g., TPC-DS, Q99)? Also, could I be added to the allowlist to trigger benchmark runs so I can iterate without requiring manual re-runs, in case I need multiple iterations?

WDYT?

@xudong963

Copy link
Copy Markdown
Member

Thanks for the thoughtful response @asolimando — the framing is exactly right, and the prior discussion with @kosiew in #21483 is helpful context.

On scope: agreed, let's land per-call caching in this PR (your Option 1) and treat cross-call caching with stable node IDs as a follow-up. Could you open an issue for Option 2 so we don't lose track?

On the cache key: (Arc::as_ptr, partition) is safe within a single synchronous compute_statistics walk — the Arcs are held by the plan tree and can't be dropped during the call, so pointer reuse isn't a concern. Good call.

On benchmarks: I'd avoid full TPC-DS Q99 — statistics computation is a small fraction of total query time and will get lost in noise. A targeted micro-bench is more informative:

  • Build a deeply nested plan (e.g., a 10+ deep UnionExec chain, or a chain of hash joins + repartitions) and time compute_statistics(plan, None) before/after this PR.
  • Optionally reuse a reproducer from [EPIC] Improve query planning speed #19795 (planning-speed EPIC) since deep plans are exactly that issue's pain point.

That should cleanly demonstrate the gain.

@asolimando

Copy link
Copy Markdown
Member Author

Thanks for the thoughtful response @asolimando — the framing is exactly right, and the prior discussion with @kosiew in #21483 is helpful context.

On scope: agreed, let's land per-call caching in this PR (your Option 1) and treat cross-call caching with stable node IDs as a follow-up. Could you open an issue for Option 2 so we don't lose track?

On the cache key: (Arc::as_ptr, partition) is safe within a single synchronous compute_statistics walk — the Arcs are held by the plan tree and can't be dropped during the call, so pointer reuse isn't a concern. Good call.

On benchmarks: I'd avoid full TPC-DS Q99 — statistics computation is a small fraction of total query time and will get lost in noise. A targeted micro-bench is more informative:

  • Build a deeply nested plan (e.g., a 10+ deep UnionExec chain, or a chain of hash joins + repartitions) and time compute_statistics(plan, None) before/after this PR.
  • Optionally reuse a reproducer from [EPIC] Improve query planning speed #19795 (planning-speed EPIC) since deep plans are exactly that issue's pain point.

That should cleanly demonstrate the gain.

Thanks for the confirmation and the clarifications, I will hopefully get to it early next week and I will ping you back as soon as I will have some updates!

@asolimando asolimando force-pushed the asolimando/partition-statistics-context branch from e135e8a to a8a3d6c Compare May 3, 2026 18:48
@github-actions

github-actions Bot commented May 3, 2026

Copy link
Copy Markdown

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion v54.0.0 (current)
       Built [ 119.257s] (current)
     Parsing datafusion v54.0.0 (current)
      Parsed [   0.037s] (current)
    Building datafusion v54.0.0 (baseline)
       Built [  98.700s] (baseline)
     Parsing datafusion v54.0.0 (baseline)
      Parsed [   0.037s] (baseline)
    Checking datafusion v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.944s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 220.823s] datafusion
    Building datafusion-datasource v54.0.0 (current)
       Built [  35.048s] (current)
     Parsing datafusion-datasource v54.0.0 (current)
      Parsed [   0.033s] (current)
    Building datafusion-datasource v54.0.0 (baseline)
       Built [  34.887s] (baseline)
     Parsing datafusion-datasource v54.0.0 (baseline)
      Parsed [   0.034s] (baseline)
    Checking datafusion-datasource v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.352s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  72.129s] datafusion-datasource
    Building datafusion-ffi v54.0.0 (current)
       Built [  55.819s] (current)
     Parsing datafusion-ffi v54.0.0 (current)
      Parsed [   0.064s] (current)
    Building datafusion-ffi v54.0.0 (baseline)
       Built [  55.983s] (baseline)
     Parsing datafusion-ffi v54.0.0 (baseline)
      Parsed [   0.065s] (baseline)
    Checking datafusion-ffi v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.367s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 114.327s] datafusion-ffi
    Building datafusion-physical-optimizer v54.0.0 (current)
       Built [  36.369s] (current)
     Parsing datafusion-physical-optimizer v54.0.0 (current)
      Parsed [   0.023s] (current)
    Building datafusion-physical-optimizer v54.0.0 (baseline)
       Built [  36.751s] (baseline)
     Parsing datafusion-physical-optimizer v54.0.0 (baseline)
      Parsed [   0.024s] (baseline)
    Checking datafusion-physical-optimizer v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.157s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  74.457s] datafusion-physical-optimizer
    Building datafusion-physical-plan v54.0.0 (current)
       Built [  34.227s] (current)
     Parsing datafusion-physical-plan v54.0.0 (current)
      Parsed [   0.138s] (current)
    Building datafusion-physical-plan v54.0.0 (baseline)
       Built [  34.476s] (baseline)
     Parsing datafusion-physical-plan v54.0.0 (baseline)
      Parsed [   0.137s] (baseline)
    Checking datafusion-physical-plan v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.868s] 223 checks: 222 pass, 1 fail, 0 warn, 30 skip

--- failure trait_method_marked_deprecated: trait method #[deprecated] added ---

Description:
A trait method is now #[deprecated]. Downstream crates will get a compiler warning when using this method.
        ref: https://doc.rust-lang.org/reference/attributes/diagnostics.html#the-deprecated-attribute
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/trait_method_marked_deprecated.ron

Failed in:
  method partition_statistics in trait datafusion_physical_plan::execution_plan::ExecutionPlan in /home/runner/work/datafusion/datafusion/datafusion/physical-plan/src/execution_plan.rs:97
  method partition_statistics in trait datafusion_physical_plan::ExecutionPlan in /home/runner/work/datafusion/datafusion/datafusion/physical-plan/src/execution_plan.rs:97

     Summary semver requires new minor version: 0 major and 1 minor checks failed
    Finished [  71.029s] datafusion-physical-plan

@asolimando

Copy link
Copy Markdown
Member Author

Hey @xudong963, I've pushed new commits implementing what we discussed (force-pushed to rebase on latest main, but the first two commits (f36ef32, 12a2fc1) are unchanged from the previous push).

A walkthrough of the new commits:

  • f36ef32 adds StatisticsContext parameter to partition_statistics, keeping the old method as deprecated, as required by the API health guidelines
  • b380893 adds partition_statistics_with_context as the new entry point
  • bb09951 adds StatsCache to StatisticsContext, shared across the entire compute_statistics walk
  • 2f843ef adds a Criterion micro-benchmark on two plan shapes from [EPIC] Improve query planning speed #19795:
    • CoalescePartitionsExec chain (depth 50): ~25x speedup over non-shared-cache baseline
    • CrossJoinExec binary tree (depth 7, 128 leaves): ~3x speedup, mirrors physical_many_self_joins from sql_planner.rs
  • a8a3d6c addresses the wasted partition forwarding: compute_statistics_inner now always pre-computes children with partition=None. Partition-preserving operators request per-partition stats on demand via compute_child_statistics, so partition-merging operators use child_stats() directly instead of triggering re-walks

Re. the benchmark: the numbers are from the average of 5 local runs, and they are conservative, as the baseline still benefits from an ephemeral per-walk cache within each re-walk, the true baseline would be no caching at all, and it would show a larger gap. Since this benchmark is new, I couldn't find a better way to show a before/after run. The improvement is clear anyway, but I just wanted to mention it for completeness.

Will open a follow-up issue for cross-call caching with stable node IDs (Option 2) once this lands, as StatsCache exists nowhere at the moment, I am afraid it would be confusing if filed now.

Looking forward to your review!

@asolimando

Copy link
Copy Markdown
Member Author

(rebased on latest main for conflict resolution, mechanical fixes only)

Comment thread datafusion/physical-plan/src/filter.rs Outdated
Comment on lines +585 to +590
let input_stats = match partition {
Some(_) => Arc::unwrap_or_clone(
ctx.compute_child_statistics(self.input.as_ref(), partition)?,
),
None => Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0])),
};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-operator boilerplate is repetitive and bug-prone. Almost every partition-preserving operator now contains:

let stats = match partition {
    Some(_) => ctx.compute_child_statistics(self.input.as_ref(), partition)?,
    None => Arc::clone(&ctx.child_stats()[0]),
};

This should be a helper on the context: ctx.child_stats_for(0, self.input.as_ref(), partition) or similar. Five identical match blocks across FilterExec, CoalesceBatchesExec, BufferExec, CooperativeExec, OutputRequirementExec is five places to make the same mistake when the contract evolves.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we added a StatisticsArgs structure as I proposed above, we could perhaps have this as a method on StatisticsArgs

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes total sense and, as suggested, StatisticsArgs proved to be a good location for this. We now have:

  • args.child_stats_for(self.input.as_ref()), which replaces the match block across all partition-preserving operators
  • args.child_stats_of(child), for partition-merging operators

Addressed in bc32cf2

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also worth measuring on a deep FilterExec chain queried at Some(0). (The context is if you ask compute_statistics(plan, Some(0)) on a deep filter chain, the framework first walks the entire tree computing None stats, then each filter turns around and asks for Some(0) stats on demand (which triggers another cached walk). The shared cache makes the second walk cheap, but for partition-preserving plans we end up populating both None and Some(p) entries for every node

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered in bf43bc7: I have added a FilterExec chain at depths 10/20/50. It shows ~2x cost of per-partition vs overall, and ~25x speedup over non-shared-cache baseline at depth 50.

The 2x cost is expected due to the second walk, and as you were anticipating, the cache still makes it cheap enough.

@xudong963 xudong963 requested review from alamb and gabotechs May 11, 2026 06:59
@alamb

alamb commented May 12, 2026

Copy link
Copy Markdown
Contributor

Thanks for the ping -- I will try and review this shortly. I am totally swamped trying to review multiple 1000+ line PRs (and trying to give them thoughtful reviews and understand the implications)

@alamb alamb left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @asolimando and @xudong963 -- this is looking like good progress. I left some thoughts,.

Comment thread datafusion/physical-plan/src/execution_plan.rs
Comment thread datafusion/physical-plan/src/filter.rs Outdated
Comment on lines +585 to +590
let input_stats = match partition {
Some(_) => Arc::unwrap_or_clone(
ctx.compute_child_statistics(self.input.as_ref(), partition)?,
),
None => Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0])),
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we added a StatisticsArgs structure as I proposed above, we could perhaps have this as a method on StatisticsArgs

Comment thread datafusion/physical-plan/src/statistics_context.rs Outdated
Comment thread datafusion/physical-plan/src/statistics_context.rs Outdated
Comment thread datafusion/physical-plan/src/statistics_context.rs Outdated
@asolimando

Copy link
Copy Markdown
Member Author

Thank you @xudong963 and @alamb for your feedback and reviews!

I am off until early next week with limited connectivity but I will get back to you soon, here and in related PRs/issues around statistics.

@alamb

alamb commented May 14, 2026

Copy link
Copy Markdown
Contributor

Thank you @xudong963 and @alamb for your feedback and reviews!

I am off until early next week with limited connectivity but I will get back to you soon, here and in related PRs/issues around statistics.

Sounds good -- thank you.

It will probably be good timing -- we'll get the 54 release out and then we can add these new APIs in 55

@asolimando asolimando force-pushed the asolimando/partition-statistics-context branch from 3d66565 to 53bbf5e Compare May 19, 2026 18:57
@asolimando asolimando force-pushed the asolimando/partition-statistics-context branch 2 times, most recently from 003d1ab to d25e1ad Compare May 22, 2026 23:34
@asolimando

asolimando commented May 22, 2026

Copy link
Copy Markdown
Member Author

@alamb @xudong963, there was another conflict so I had to force push again, since there were a couple of artifact from the previous rebase, I did a new one and reworked the commit by "theme", as it was getting hard to manage.

Current commits:

  • 5511118: deprecate partition_statistics, add statistics_with_args with StatisticsArgs + shared StatsCache, lazy computation, fix SampleExec
  • dfc6f2b: benchmarks for 4 plan shapes: coalesce chain, cross-join tree, filter chain, mixed chain
  • ab6ee6d: upgrade guide (55.0.0.md)
  • 27f3769: rename statistics_context -> statistics, use impl AsRef (per @alamb)
  • 25757b9: remove compute_statistics free function, callers manage StatisticsArgs directly (per @alamb)

@asolimando asolimando force-pushed the asolimando/partition-statistics-context branch from d25e1ad to 25757b9 Compare May 23, 2026 00:04
@github-actions github-actions Bot added the ffi Changes to the ffi crate label May 23, 2026
@asolimando asolimando requested review from alamb and xudong963 May 23, 2026 00:35
@asolimando asolimando force-pushed the asolimando/partition-statistics-context branch 2 times, most recently from f93b452 to b4b8e76 Compare May 28, 2026 19:11
…StatsCache

StatisticsArgs carries partition index and a per-call cache. Operators
look up child stats lazily via compute_child_statistics(child, partition).
…sArgs

Callers now create StatisticsArgs directly and call
plan.statistics_with_args(). The cache is created in StatisticsArgs::new()
and shared through compute_child_statistics calls.
@asolimando asolimando force-pushed the asolimando/partition-statistics-context branch from b4b8e76 to e19b719 Compare May 28, 2026 19:20

@xudong963 xudong963 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

…n-statistics-context

# Conflicts:
#	datafusion/physical-plan/src/execution_plan.rs
#	datafusion/physical-plan/src/filter.rs
#	docs/source/library-user-guide/upgrading/55.0.0.md
@asolimando

Copy link
Copy Markdown
Member Author

@alamb @xudong963: I have fixed the new conflicts, would you be able to take a final look if all looks good to you? Happy to address any remaining concerns.

@alamb

alamb commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Now that we have released 54.0.0 I have some more time to work on major changes for 55. Checking this one out again

@alamb alamb left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @asolimando and @xudong963 -- I just went over this PR again and I think it looks like a nice step forward.

I have two small suggestions:

However, I am also happy to implement them as their own follow on PRs

Comment on lines +581 to +582
#[expect(deprecated)]
self.partition_statistics(args.partition())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was thinking we could make it

    fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
      if let Some(idx) = args.partition() {
            // Validate partition index
            let partition_count = self.properties().partitioning.partition_count();
            assert_or_internal_err!(
                idx < partition_count,
                "Invalid partition index: {}, the partition count is {}",
                idx,
                partition_count
            );
        }
        Ok(Arc::new(Statistics::new_unknown(&self.schema())))
    }

(aka literally copy/paste the implementation of partition_statistics inline)

Comment thread datafusion/physical-plan/src/statistics.rs Outdated
Comment thread datafusion/ffi/src/execution_plan.rs Outdated
@2010YOUY01

Copy link
Copy Markdown
Contributor

Thank you, the implementation is neat! I believe it solves the recomputation issue.

I have one idea to improve the API:

Currently, the caching logic is explicitly implemented inside each operator’s statistics computation. We could decouple cache management from the operator-level statistics propagation, so that the implementation is easier to evolve.

The idea would look like this:

Stateless API inside ExecutionPlan

The API inside ExecutionPlan should only take input statistics and compute output statistics. Cache operations should be managed externally.

For example, the operator-level API could look like:

fn statistics_from_inputs(
    &self,
    input_stats: &[Arc<Statistics>],
    partition: Option<usize>,
) -> Result<Arc<Statistics>>;

For a unary operator such as FilterExec, the implementation would only describe the local propagation logic:

fn statistics_from_inputs(
    &self,
    input_stats: &[Arc<Statistics>],
    partition: Option<usize>,
) -> Result<Arc<Statistics>> {
    let input_stats = input_stats[0].as_ref();

    let stats = Self::statistics_helper(
        &self.input.schema(),
        input_stats,
        self.predicate(),
        self.default_selectivity,
    )?;

    Ok(Arc::new(stats.project(self.projection.as_ref())))
}

This keeps the operator implementation focused on the question:

Given the input statistics, how do I derive the output statistics?

It does not need to know whether the input statistics came from a cache, from recursive computation, or from an external statistics provider.

External StatisticsContext layer for cache operations

The cache lookup, recursive child computation, and cache insertion can be handled by an external StatisticsContext:

impl StatisticsContext {
    pub fn compute_statistics(
        &self,
        plan: &Arc<dyn ExecutionPlan>,
        partition: Option<usize>,
    ) -> Result<Arc<Statistics>> {
        if let Some(cached) = self.cache.get(plan, partition) {
            return Ok(cached);
        }

        let input_stats = plan
            .children()
            .iter()
            .map(|child| self.compute_statistics(child, partition))
            .collect::<Result<Vec<_>>>()?;

        let stats = plan.statistics_from_inputs(&input_stats, partition)?;

        self.cache.insert(plan, partition, Arc::clone(&stats));

        Ok(stats)
    }
}

With this structure, each ExecutionPlan only implements local statistics propagation, while StatisticsContext owns traversal and caching.

Reasons

We are still uncertain about the long-term implementation details of statistics propagation, so it is likely that this layer will need frequent changes.

One specific example is cached per-partition statistics. In the future, the cache may need to become more fine-grained. For example, suppose partition[1] already has cached range and ndv, and a new request asks for range, ndv, and histogram. Ideally, we should only compute the missing histogram on demand, instead of recomputing the whole statistics object.

Such requirements would need significant changes to the caching layer. If cache operations are embedded directly inside every ExecutionPlan, then each change to the cache model may require touching many operators.

By moving cache management into an external StatisticsContext, we can evolve the caching strategy independently, without propagating large API changes into every ExecutionPlan.

In short, I think the current implementation solves the recomputation issue, but separating local statistics propagation from cache management may give us a cleaner long-term API.

alamb and others added 3 commits June 15, 2026 11:28
…change

Replace `StatisticsArgs::new(partition)` with a builder-style API:

* `StatisticsArgs::new()` takes no arguments (partition defaults to `None`)
* `with_partition(Some(idx))` / `set_partition(Some(idx))` set the partition

Changing the partition starts a new statistics walk, so the memoization
cache (keyed by raw plan pointer + partition) is now reset when the
partition changes. This prevents entries computed for one walk from
leaking into another, where a since-dropped plan node could share an
address with a new node and produce a stale cache hit.

Update all call sites and add a unit test covering the cache reset.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@asolimando

Copy link
Copy Markdown
Member Author

Thank you @asolimando and @xudong963 -- I just went over this PR again and I think it looks like a nice step forward.

I have two small suggestions:

However, I am also happy to implement them as their own follow on PRs

Thank you @alamb for the approval and for preparing the two PRs, I have cherry-picked both and just added a small follow-up commit to adapt the FFI call sites to the new builder-style API. Apologies for the late reply but I was off last week.

…n-statistics-context

Conflicts:
- aggregates/mod.rs: keep statistics_with_args, pass args.partition() to statistics_inner (upstream added partition arg)
- hash_join/exec.rs: keep StatisticsArgs import + statistics_with_args, drop get_record_batch_memory_size (replaced by RecordBatchMemoryCounter), add missing null_equality arg to (None, _) branch
@asolimando

Copy link
Copy Markdown
Member Author

Thank you, the implementation is neat! I believe it solves the recomputation issue.

I have one idea to improve the API:

Currently, the caching logic is explicitly implemented inside each operator’s statistics computation. We could decouple cache management from the operator-level statistics propagation, so that the implementation is easier to evolve.

Thanks @2010YOUY01, I agree that decoupling the statistics handling from the computation is the proper long-term solution, I have filed #22958 to track this issue, I hope I have correctly summarized your proposal there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation ffi Changes to the ffi crate optimizer Optimizer rules physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Let partition_statistics accept pre-computed children statistics

4 participants